package me.seu.demo.controller;

import lombok.extern.slf4j.Slf4j;
import me.seu.demo.domain.OrderPaidEvent;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.List;

/**
 * RocketMQ API
 *
 * @author liangfeihu
 * @since 2020/4/16 15:24
 */
@Slf4j
@RestController
@RequestMapping("/rocketmq")
public class RocketMqController {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("/send/sync")
    public ResponseEntity sendMsgSynchronously() {
        //send message synchronously
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");

        //send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());

        return ResponseEntity.ok("send messgae synchronously success");
    }


    @GetMapping("/send/async")
    public ResponseEntity sendMsgAynchronously() {
        //send messgae asynchronously
        rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult msg) {
                System.out.printf("async onSucess SendResult=%s %n", msg);
            }

            @Override
            public void onException(Throwable err) {
                System.out.printf("async onException Throwable=%s %n", err);
            }

        });

        return ResponseEntity.ok("send messgae asynchronously success");
    }

    @GetMapping("/send/orderly")
    public ResponseEntity sendMsgOrderly() {
        // 设置队列选择器
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                int index = Integer.parseInt(o + "") % list.size();
                return list.get(index);
            }
        });
        // Send messages orderly
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload("Hello, World msg" + i).build(), i + "");
        }

        // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
        // rocketMQTemplate.destroy();

        return ResponseEntity.ok("send messgae orderly success");
    }

    @GetMapping("/send/orderly/one")
    public ResponseEntity sendMsgOrderlyOne() {
        // Send messages orderly
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.syncSendOrderly("orderly-topic-one", MessageBuilder.withPayload("Orderly msg " + i).build(), i + "");
        }

        // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
        // rocketMQTemplate.destroy();

        return ResponseEntity.ok("send messgae orderly one success");
    }
}
